=begin pod

=TITLE class Channel

=SUBTITLE Thread-safe queue for sending values from producers to consumers

    class Channel {}

A C<Channel> is a thread-safe queue that helps you to send a series of objects from
one or more producers to one or more consumers.  Each object will arrive at only
one such consumer, selected by the scheduler.  If there is only one consumer
and one producer, order of objects is guaranteed to be preserved.  Sending on a
C<Channel> is non-blocking.

    my $c = Channel.new;
    await (^10).map: {
        start {
            my $r = rand;
            sleep $r;
            $c.send($r);
        }
    }
    $c.close;
    say $c.list;

Further examples can be found in the L<concurrency page|/language/concurrency#Channels>

=head1 Methods

=head2 method send

Defined as:

    method send(Channel:D: \item)

Enqueues an item into the C<Channel>. Throws an exception of type
L<X::Channel::SendOnClosed> if the channel has been closed already.
This call will B<not> block waiting for a consumer to take the object.
There is no set limit on the number of items that may be queued, so
care should be taken to prevent runaway queueing.

    my $c = Channel.new;
    $c.send(1);
    $c.send([2, 3, 4, 5]);
    $c.close;
    say $c.list; # OUTPUT: «(1 [2 3 4 5])␤»

=head2 method receive

Defined as:

    method receive(Channel:D:)

Receives and removes an item from the channel. It blocks if no item is
present, waiting for a C<send> from another thread.

Throws an exception of
type L<X::Channel::ReceiveOnClosed> if the channel has been closed, and the
last item has been removed already, or if C<close> is called while C<receive>
is waiting for an item to arrive.

If the channel has been marked as erratic with method C<fail>, and the last
item has been removed, throws the argument that was given to C<fail> as an
exception.

See method C<poll> for a non-blocking version that won't throw exceptions.

    my $c = Channel.new;
    $c.send(1);
    say $c.receive; # OUTPUT: «1␤»

=head2 method poll

Defined as:

    method poll(Channel:D:)

Receives and removes an item from the channel. If no item is present, returns
C<Nil> instead of waiting.

    my $c = Channel.new;
    Promise.in(2).then: { $c.close; }
    ^10 .map({ $c.send($_); });
    loop {
        if $c.poll -> $item { $item.say };
        if $c.closed  { last };
        sleep 0.1;
    }

See method C<receive> for a blocking version that properly responds to channel
closing and failure.

=head2 method close

Defined as:

    method close(Channel:D:)

Close the C<Channel>, normally.  This makes subsequent C<send> calls die with
L<X::Channel::SendOnClosed>.  Subsequent calls of C<.receive> may still drain
any remaining items that were previously sent, but if the queue is empty, will
throw an L<X::Channel::ReceiveOnClosed> exception.  A C<Seq> produced by an
C<@()> or by the C<.list> method will not terminate until this has been done.

    my $c = Channel.new;
    $c.close;
    $c.send(1);
    CATCH { default { put .^name, ': ', .Str } };
    # OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel␤»

Please note that any exception thrown may prevent C<.close> from being called,
this may hang the receiving thread. Use a L<LEAVE|/language/phasers#LEAVE>
phaser to enforce the C<.close> call in this case.

=head2 method list

Defined as:

    method list(Channel:D: --> List:D)

Returns a C<Seq> which will iterate items in the queue and removes each item
from the queue as it iterates. This can only terminate once the C<close> method
has been called.

    my $c = Channel.new; $c.send(1); $c.send(2);
    $c.close;
    say $c.list; # OUTPUT: «(1 2)␤»

=head2 method closed

Defined as:

    method closed(Channel:D: --> Promise:D)

Returns a promise that will be kept once the channel is closed by a call to
method C<close>.

    my $c = Channel.new;
    $c.closed.then({ say "It's closed!" });
    $c.close;
    sleep 1;

=head2 method fail

Defined as:

    method fail(Channel:D: $error)

Closes the C<Channel> (that is, makes subsequent C<send> calls die), and enqueues
the error to be thrown as the final element in the channel. Method C<receive>
will throw that error as an exception.  Does nothing if the channel has already
been closed or C<.fail> has already been called on it.

    my $c = Channel.new;
    $c.fail("Bad error happens!");
    $c.receive;
    CATCH { default { put .^name, ': ', .Str } };
    # OUTPUT: «X::AdHoc: Bad error happens!␤»

=head2 method Capture

Defined as:

    method Capture(Channel:D --> Capture:D)

Equivalent to calling L«C<.List.Capture>|/type/List#method_Capture»
on the invocant.

=head2 method Supply

Defined as:

    method Supply(Channel:D:)

This returns an C<on-demand> L<Supply> that emits a value for every value
received on the Channel. C<done> will be called on the C<Supply> when the L<Channel>
is closed.

    my $c = Channel.new;
    my Supply $s1 = $c.Supply;
    my Supply $s2 = $c.Supply;
    $s1.tap(-> $v { say "First $v" });
    $s2.tap(-> $v { say "Second $v" });
    ^10 .map({ $c.send($_) });
    sleep 1;

Multiple calls to this method produce multiple instances of Supply, which compete
over the values from the Channel.

=head2 sub await

Defined as:

    multi sub await(Channel:D)
    multi sub await(*@)

Waits until all of one or more channels has a value available, and returns
those values (it calls C<.receive> on the channel). Also works with
L<promises|/type/Promise>.

    my $c = Channel.new;
    Promise.in(1).then({$c.send(1)});
    say await $c;

=end pod

# vim: expandtab softtabstop=4 shiftwidth=4 ft=perl6
